#!/usr/bin/python3
# ******************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
# licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#     http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# ******************************************************************************/
import os
import re
from collections import defaultdict
from typing import Dict, List, Tuple, Optional, Set

from ceres.conf.constant import CommandExitCode, TaskExecuteRes, CveFixTaskType
from ceres.function.check import PreCheck
from ceres.function.log import LOGGER
from ceres.function.status import *
from ceres.function.util import execute_shell_command
from ceres.manages.collect_manage import Collect


class VulnerabilityManage:
    def repo_set(self, data: dict) -> int:
        """
        Save the repo source to local, and do simple verification.

        Args:
            data (dict): e.g
                {
                    "repo_info": {
                        "name": "string",
                        "dest": "save location",
                        "repo_content": "repo source info"
                    },
                    "check_items": ["string"],
                    "check": false
                }

        Returns:
            int: status code
        """
        repo_path = data.get("repo_info").get("dest")
        if re.match(r"/etc/yum.repos.d/[\w-]+.repo$", repo_path) is None:
            LOGGER.debug('Incorrect repo save path.')
            return PARAM_ERROR

        content = data.get("repo_info").get("repo_content")
        repo_id_list = re.findall(r'\[([^\]]+)\]', re.sub(r'^\s*#.*$', '', content, flags=re.MULTILINE))
        if not repo_id_list:
            LOGGER.warning("Failed to extract repo id information, please check the repo content.")
            return REPO_CONTENT_INCORRECT

        with open(repo_path, 'w', encoding='utf8') as repo_file:
            repo_file.write(content)
            LOGGER.info(f'Repo source {data.get("repo_info").get("name")} has been saved to {repo_path}.')

        if self._validate_repo_source(repo_id_list):
            LOGGER.info('Repo source set succeed.')
            return SUCCESS
        os.remove(repo_path)
        LOGGER.warning("Repo source can't be used, it has been deleted.")
        return REPO_CONTENT_INCORRECT

    @staticmethod
    def _validate_repo_source(repo_id_list: List[str]) -> bool:
        """
        A sample validate which repo can used by yum.

        Args:
            repo_id(list): repo id list

        Returns:
            bool
        """

        def query_repo_info(repo_id: str) -> bool:
            """
            Verify the validity of the repo source by querying the repo source information.

            Args:
                repo_id(str)

            Returns:
                bool
            """
            code, _, stderr = execute_shell_command([f"dnf repoinfo --repo {repo_id}"])
            if code == CommandExitCode.SUCCEED:
                return True
            LOGGER.warning(f"Failed to query repo information with repo id {repo_id}.")
            LOGGER.warning(stderr)
            return False

        validate_result = True
        for repo_id in repo_id_list:
            validate_result = validate_result and query_repo_info(repo_id)
        return validate_result

    def cve_scan(self, cve_scan_args: dict) -> Tuple[str, dict]:
        """
        query vulnerability info in the machine

        Args:
            cve_scan_args(dict): e.g
                {
                    check_items: ["network"]
                }

            check_items: Items that need to be checked before execution.

        Returns:
            int: status code
            dict: e.g
                {
                    "check_items": [{
                        "item": "network",
                        "result":False,
                        "log":"check log"
                        }],
                    "unfixed_cves": [{
                        "cve_id": "CVE-2023-1513",
                        "installed_rpm": "kernel-4.19.90-2304.1.0.0131.oe1.x86_64",
                        "available_rpm": "kernel-4.19.90-2304.1.0.0196.oe1.x86_64",
                        "support_way": "coldpatch",
                        }],
                    "fixed_cves": [
                        {
                            "cve_id": "CVE-2023-1112",
                            "installed_rpm":"redis-4.2.5-1.oe2203.x86_64",
                            "fix_way": "hotpatch",
                            "hp_status": "ACCEPTED" //only hotpatch has the field
                        },
                        {
                            "cve_id": "CVE-2023-1112",
                            "installed_rpm":"redis-4.2.5-1.oe2203.x86_64",
                            "fix_way": "coldpatch"
                        }
                    ]
                }
        """
        cve_scan_result = {}

        check_result, items_check_log = PreCheck.execute_check(cve_scan_args.get("check_items"))
        cve_scan_result["check_items"] = items_check_log
        if not check_result:
            LOGGER.info("The pre-check is failed before execute command!")
            return PRE_CHECK_ERROR, cve_scan_result

        self.installed_rpm_info = self._query_installed_rpm()
        self.available_hotpatch_key_set = set()

        cve_scan_result.update(
            {
                "check_items": items_check_log,
                "unfixed_cves": self._query_unfixed_cves_by_dnf_plugin() or self._query_unfixed_cves_by_dnf(),
                "fixed_cves": self._query_fixed_cves_by_dnf_plugin() or self._query_fixed_cves_by_dnf(),
            }
        )
        return SUCCESS, cve_scan_result

    @staticmethod
    def _query_installed_rpm():
        """
        query installed rpm package info

        Returns:
            dict: all rpm info. e.g
                {
                    "kernel":"kernel-5.10.0-60.92.0.116.oe2203.aarch64"
                }
        """
        rpm_info_dict = {}
        # Example of command execution result:
        # openldap:openldap-2.4.50-6.oe1.x86_64
        # kernel:kernel-4.19.90-2310.3.0.0222.oe1.x86_64
        # systemtap-runtime:systemtap-runtime-4.3-2.oe1.x86_64
        # perl-Net-SSLeay:perl-Net-SSLeay-1.88-5.oe1.x86_64
        # powertop:powertop-2.9-12.oe1.x86_64
        # libusbx:libusbx-1.0.23-1.oe1.x86_64
        code, stdout, _ = execute_shell_command(
            ["rpm -qa --queryformat '%{NAME}:%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}\n'", "grep kernel"]
        )
        if code != CommandExitCode.SUCCEED or not stdout:
            LOGGER.error("query installed packages info failed!")
            return rpm_info_dict

        for line in stdout.splitlines():
            rpm_name, rpm_info = line.split(":", 1)
            rpm_info_dict[rpm_name] = rpm_info

        rpm_info_dict["kernel"] = (
            f"kernel-{Collect.get_current_kernel_version()}" if Collect.get_current_kernel_version() else ""
        )

        LOGGER.debug("query installed rpm package info succeed!")
        return rpm_info_dict

    def _query_unfixed_cves_by_dnf(self) -> list:
        """
        parse unfixed kernel vulnerability info by dnf (coldpatch)

        Return:
            str: command execute result
            list: cve info e.g
                [{
                    "cve_id": "CVE-2023-1513",
                    "installed_rpm": "kernel-4.19.90-2304.1.0.0131.oe1.x86_64",
                    "available_rpm": "kernel-4.19.90-2304.1.0.0196.oe1.x86_64",
                    "support_way": "coldpatch",
                }]

        """
        # Example of command execution result:
        # Last metadata expiration check: 0:26:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.
        # CVE-2021-43976  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-0941   Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-45469  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-44733  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        unfixed_cves = []
        code, stdout, stderr = execute_shell_command(["dnf updateinfo list cves", "grep kernel"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("query unfixed cve info failed by dnf!")
            LOGGER.error(stderr)
            return unfixed_cves

        # Example of regex matching result:
        # [
        # ("CVE-2021-43976", "Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64"),
        # ("CVE-2021-0941", "Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64")
        # ]
        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+)", stdout)
        if not all_cve_info:
            return unfixed_cves

        for cve_id, _, coldpatch in all_cve_info:
            rpm_name = coldpatch.rsplit("-", 2)[0]
            unfixed_cves.append(
                {
                    "cve_id": cve_id,
                    "installed_rpm": self.installed_rpm_info.get(rpm_name),
                    "available_rpm": coldpatch,
                    "support_way": "coldpatch",
                }
            )
        return unfixed_cves

    def _query_unfixed_cves_by_dnf_plugin(self) -> list:
        """
        parse unfixed kernel vulnerability info by dnf hotpatch plugin (hotpatch and coldpatch)

        Return:
            str: command execute result
            list: cve info e.g
                [{
                    "cve_id": "CVE-2023-1513",
                    "installed_rpm": "kernel-4.19.90-2304.1.0.0131.oe1.x86_64",
                    "available_rpm": "kernel-4.19.90-2304.1.0.0196.oe1.x86_64",
                    "support_way": "coldpatch",
                }]
        """

        def generate_single_vulnerability_info(
            rpm_name: str, support_way: str, available_rpm: str, hotpatch: str = None
        ):
            if support_way != "hotpatch":
                return {
                    "cve_id": cve_id,
                    "installed_rpm": self.installed_rpm_info.get(rpm_name),
                    "available_rpm": available_rpm,
                    "support_way": support_way,
                }
            return {
                "cve_id": cve_id,
                "installed_rpm": self.installed_rpm_info.get(rpm_name),
                "available_rpm": hotpatch,
                "support_way": support_way,
            }

        # Example of command execution result:
        # Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.
        # CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -
        # CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -
        # CVE-2023-1513   Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         patch-kernel-4.19.90-2112...
        cve_info_list = []
        code, stdout, stderr = execute_shell_command(["dnf hot-updateinfo list cves", "grep kernel"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("query unfixed cve info failed by dnf!")
            LOGGER.error(stderr)
            return cve_info_list

        # Example of regex matching result:
        # [
        # ("CVE-2023-1513", "Important/Sec.", "kernel-4.19.90-2304.1.0.0196.oe1.x86_64", "patch-kernel-4.19.90-2112.."),
        # ("CVE-2021-xxxx", "Important/Sec.", "-", "patch-redis-6.2.5-1-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64")
        # ]
        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+|-)\s+(patch-kernel-\d\S+|-)", stdout)
        if not all_cve_info:
            return cve_info_list

        coldpatch_key_set, unavailable_hotpatch_key_set = set(), set()
        for cve_id, _, coldpatch, hotpatch in all_cve_info:
            rpm_name = coldpatch.rsplit("-", 2)[0]
            key = f"{cve_id}-{rpm_name}"
            if hotpatch != "-":
                if coldpatch == "-":
                    try:
                        # Example of hotpatch rpm name:
                        # patch-redis-6.2.5-1-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64
                        rpm_name = hotpatch.rsplit("-", 5)[0].split("-", 1)[1]
                    except IndexError as error:
                        LOGGER.warning(error)
                        rpm_name = ""
                    key = f"{cve_id}-{rpm_name}"

                cve_info_list.append(generate_single_vulnerability_info(rpm_name, "hotpatch", coldpatch, hotpatch))
                self.available_hotpatch_key_set.add(key)

            if coldpatch != "-" and key not in coldpatch_key_set:
                cve_info_list.append(generate_single_vulnerability_info(rpm_name, "coldpatch", coldpatch))
                coldpatch_key_set.add(key)

            if (coldpatch == "-") and (hotpatch == "-") and (key not in unavailable_hotpatch_key_set):
                cve_info_list.append(generate_single_vulnerability_info(rpm_name, None, coldpatch))
                unavailable_hotpatch_key_set.add(key)

        return cve_info_list

    def _query_fixed_cves_by_dnf(self) -> list:
        """
        parse the fixed kernel vulnerability info by dnf

        Return:
            str: command execute result
            list: cve info e.g
                [
                    {"cve_id": "CVE-XXXX-XXXX","installed_rpm": "kernel-version-release.arch", "fix_way":"coldpatch"},
                ]

        """
        # Example of command execution result:
        # Last metadata expiration check: 0:26:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.
        # CVE-2021-43976  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-0941   Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-45469  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        # CVE-2021-44733  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64
        fixed_cves = []
        current_kernel_version = Collect.get_current_kernel_version()
        if not current_kernel_version:
            return fixed_cves
        current_kernel_rpm_name = f"kernel-{current_kernel_version}"

        code, stdout, stderr = execute_shell_command(["dnf updateinfo list cves --installed", "grep kernel"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("query fixed cve info failed!")
            LOGGER.error(stderr)
            return fixed_cves

        # Example of regex matching result:
        # [
        # ("CVE-2021-43976","Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64"),
        # ("CVE-2021-0941","Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64")
        # ]
        fixed_cves_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+)", stdout)

        if not fixed_cves_info:
            return fixed_cves

        for cve_id, _, coldpatch in fixed_cves_info:
            if coldpatch <= current_kernel_rpm_name:
                fixed_cves.append(
                    {
                        "cve_id": cve_id,
                        "installed_rpm": self.installed_rpm_info.get(coldpatch.rsplit("-", 2)[0]),
                        "fix_way": "coldpatch",
                    }
                )
        return fixed_cves

    def _query_fixed_cves_by_dnf_plugin(self) -> list:
        """
        parse the fixed kernel vulnerability info by dnf plugin

        Return:
            list: hotpatch info list. e.g
                [{"cve_id": "CVE-XXXX-XXXX", "fix_way": "hotpatch", "hp_status": "ACCEPTED", "installed_rpm":"xxxx"}]

        """
        # Example of command execution result:
        # Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.
        # CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -
        # CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -
        # CVE-2023-1513   Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         patch-kernel-4.19.90-2112...
        current_kernel_version = Collect.get_current_kernel_version()
        if not current_kernel_version:
            return []
        current_kernel_rpm_name = f"kernel-{current_kernel_version}"

        code, stdout, stderr = execute_shell_command(["dnf hot-updateinfo list cves --installed", "grep kernel"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("query unfixed cve info failed by dnf!")
            LOGGER.error(stderr)
            return []

        # Example of regex matching result:
        # [
        # ("CVE-2023-1513", "Important/Sec.", "kernel-4.19.90-2304.1.0.0196.oe1.x86_64", "patch-kernel-4.19.90-2112.."),
        # ("CVE-2021-xxxx", "Important/Sec.", "-", "patch-redis-6.2.5-1-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64")
        # ]
        hotpatch_status = self._query_applied_hotpatch_status()
        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+|-)\s+(patch-kernel-\d\S+|-)", stdout)

        cve_info_fixed_by_coldpatch, cve_info_fixed_by_hotpatch, hotpatch_dic = [], [], defaultdict(str)
        for cve_id, _, coldpatch, hotpatch in all_cve_info:
            if hotpatch == "-":
                if coldpatch > current_kernel_rpm_name:
                    continue
                cve_info_fixed_by_coldpatch.append(
                    {
                        "cve_id": cve_id,
                        "installed_rpm": self.installed_rpm_info.get(coldpatch.rsplit("-", 2)[0]),
                        "fix_way": "coldpatch",
                    }
                )
            else:
                cve_info_fixed_by_hotpatch.append({"cve_id": cve_id, "fix_way": "hotpatch", "installed_rpm": hotpatch})

                hotpatch_dic_key = hotpatch.rsplit("-", 2)[0]
                if hotpatch_dic_key.endswith("ACC"):
                    hotpatch_dic[hotpatch_dic_key] = max(hotpatch, hotpatch_dic.get(hotpatch_dic_key, hotpatch))

        for cve_info in cve_info_fixed_by_hotpatch:
            hotpatch_dic_key = cve_info["installed_rpm"].rsplit("-", 2)[0]

            if hotpatch_dic_key in hotpatch_dic:
                cve_info["installed_rpm"] = hotpatch_dic[hotpatch_dic_key]
            cve_info["hp_status"] = hotpatch_status.get(cve_info["installed_rpm"].rsplit(".", 1)[0], "")

        return cve_info_fixed_by_coldpatch + cve_info_fixed_by_hotpatch

    def _query_applied_hotpatch_status(self) -> Dict[str, str]:
        """
        query applied hotpatch with its status

        Return:
            dict: key is hotpatch name, value is its status. e.g {"patch-redis-6.2.5-1-ACC-1-3": "ACTIVED"}

        """
        # Example of command execution result:
        # Last metadata expiration check: 0:28:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.
        # CVE-id        base-pkg/hotpatch                                                 status
        # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-benchmark                             ACTIVED
        # CVE-2023-1112 redis-6.2.5-1/ACC-1-1/redis-benchmark                             ACTIVED
        # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-cli                                   ACTIVED
        # CVE-2023-1112 redis-6.2.5-1/ACC-1-1/redis-cli                                   ACTIVED
        # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-server                                NOT-APPLIED
        # CVE-2023-1112 redis-6.2.5-1/ACC-1-1/redis-server                                NOT-APPLIED
        # CVE-2023-2221 redis-6.2.5-1/ACC-1-2/redis-cli                                   NOT-APPLIED
        # CVE-2023-2222 redis-6.2.5-1/ACC-1-2/redis-cli                                   NOT-APPLIED
        # CVE-2023-1111 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-benchmark NOT-APPLIED
        # CVE-2023-1112 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-benchmark NOT-APPLIED
        # CVE-2023-1111 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-cli       NOT-APPLIED
        # CVE-2023-1112 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-cli       NOT-APPLIED
        # CVE-2023-1111 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-server    NOT-APPLIED
        # CVE-2023-1112 redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-server    NOT-APPLIED
        result = {}
        code, stdout, stderr = execute_shell_command(["dnf hotpatch --list cves"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("query applied hotpatch info failed!")
            LOGGER.error(stderr)
            return result

        # Example of regex matching result:
        # [
        # ("CVE-2023-1112", "redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-server", "NOT-APPLIED"),
        # ("CVE-2023-1111", "redis-6.2.5-1/ACC-1-1/redis-benchmark", "ACTIVED")
        # ]
        applied_hotpatch_info_list = re.findall(r"(CVE-\d{4}-\d+)\s+(kernel-\d[\w\-/.]+)\s+([A-W]+)", stdout)

        if not applied_hotpatch_info_list:
            return result

        record_key_set = set()
        for cve_id, patch_name, hotpatch_status in applied_hotpatch_info_list:
            rpm = patch_name.split("-", 1)[0]
            # Refer to this example, the CVE can be marked as fixed only if all hotpatch are applied.
            # CVE-id        base-pkg/hotpatch                                                 status
            # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-benchmark                             ACTIVED
            # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-cli                                   ACTIVED
            # CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-server                                NOT-APPLIED
            record_key = f"{cve_id}-{rpm}"
            if (
                (record_key not in self.available_hotpatch_key_set)
                and (hotpatch_status in ("ACTIVED", "ACCEPTED"))
                and record_key not in record_key_set
            ):
                result[f"patch-{patch_name.rsplit('/',1)[0].replace('/','-')}"] = hotpatch_status
                record_key_set.add(record_key)
        return result

    def cve_fix(self, task_info: dict) -> dict:
        """
        fix cves by upgrading packages

        Args:
            task_info(dict): cve info which need to fix and check_items,e.g.
            {
                "fix_type": "coldpatch",
                "check_items": [],
                "rpms": [
                    {
                        "installed_rpm": "xxxxx",
                        "available_rpm": "unzip-6.0-50.oe2203.x86_64",
                    }
                ],
                "accepted": False,
            }

        Returns:
            dict: cve fix result e.g
                {
                    "check_items":[
                        {
                            "item":"network",
                            "result":true,
                            "log":"xxxx"
                        }
                    ],
                    "rpms":[
                        {
                            "installed_rpm":"kernel-4.19xxx",
                            "result": "succeed",
                            "log": "fix succeed"
                        }
                    ],
                    "dnf_event_start": 1,
                    "dnf_event_end": 5,
                    "status": succeed
                }
        """
        result = {}

        rpms = [rpm.get("available_rpm") for rpm in task_info["rpms"]]
        check_result, items_check_log = PreCheck.execute_check(task_info["check_items"])
        result["check_items"] = items_check_log
        if not check_result:
            LOGGER.warning("The pre-check is failed before execute command!")
            result["rpms"] = [
                {
                    "available_rpm": rpm,
                    "result": TaskExecuteRes.FAIL,
                    "log": "pre-check items check failed",
                }
                for rpm in rpms
            ]
            result["status"] = TaskExecuteRes.FAIL
            return result

        if task_info["fix_type"] == CveFixTaskType.COLDPATCH:
            result["status"], result["rpms"] = self._update_coldpatch_by_dnf_plugin(rpms)
        else:
            # The implementation of the hotpatch upgrade and rollback plan relies on the dnf transaction,
            # so the dnf transaction ID information needs to be returned after the repair is completed.
            result["dnf_event_start"] = self._query_latest_dnf_transaction_id()
            result["status"], result["rpms"], transaction_count = self._update_hotpatch_by_dnf_plugin(
                rpms, task_info["accepted"]
            )
            result["dnf_event_end"] = self._query_latest_dnf_transaction_id()
            if result["dnf_event_end"] - result["dnf_event_start"] != transaction_count:
                result["dnf_event_start"] = result["dnf_event_end"] = None
        return result

    def _update_coldpatch_by_dnf_plugin(self, rpms: List[str]) -> Tuple[str, list]:
        """
        update rpm of list and return their upgrade log

        Args:
            rpms(list): List of packages that need to be upgraded

        Returns:
            Tuple[str, List[dict]]
            a tuple containing two elements (update result, Information about each package upgrade log).
        """

        def gen_fail_result(rpms: List[str], log: str):
            return [
                {
                    "available_rpm": rpm,
                    "result": TaskExecuteRes.FAIL,
                    "log": log,
                }
                for rpm in rpms
            ]

        status, fixable_cve_info = self._query_fixable_cve_info()
        if status != SUCCESS:
            return TaskExecuteRes.FAIL, gen_fail_result(
                rpms, "Execution of CVE comparison failed due to failure to query fixable CVE information."
            )
        status, fixed_cve_info = self._query_fixed_cve_info_by_hotpatch()
        if status != SUCCESS:
            return TaskExecuteRes.FAIL, gen_fail_result(
                rpms, "Execution of CVE comparison failed due to failure to query fixed CVE information."
            )

        final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, []
        for rpm in rpms:
            rpm_fix_info = {"available_rpm": rpm, "result": TaskExecuteRes.SUCCEED, "log": ""}
            compare_result, log = self.compare_cve(rpm, fixable_cve_info, fixed_cve_info)
            if compare_result:
                rpm_fix_info["result"], rpm_fix_info["log"] = self.__update_coldpatch(rpm)
            else:
                rpm_fix_info["result"] = TaskExecuteRes.FAIL
                rpm_fix_info["log"] = log

            if rpm_fix_info["result"] == TaskExecuteRes.FAIL:
                final_fix_result = TaskExecuteRes.FAIL

            package_update_info.append(rpm_fix_info)
        return final_fix_result, package_update_info

    def __update_coldpatch(self, rpm: str) -> Tuple[str, str]:
        """
        upgrade rpm by dnf plugin (coldpatch)

        Args:
            rpm(str): package that need to be upgraded

        Returns:
            Tuple[str, str]
            a tuple containing two elements (upgrade result, package upgrade log).
        """
        code, stdout, stderr = execute_shell_command([f"dnf upgrade-en {rpm} -y"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error(stderr)
            return TaskExecuteRes.FAIL, stdout + stderr
        elif rpm.rsplit("-", 2)[0] == "kernel":
            if not self.set_default_grub_kernel_version(rpm):
                return TaskExecuteRes.FAIL, stdout + stderr + "\nerror: set default kernel failed!"
        return TaskExecuteRes.SUCCEED, stdout + stderr

    def _update_hotpatch_by_dnf_plugin(self, rpms: List[str], accepted: bool) -> Tuple[str, list, int]:
        """
        upgrade rpm by dnf plugin (hotpatch)

        Args:
            rpms(list): List of packages that need to be upgraded

        Returns:
            Tuple[str, List[dict], int]
            a tuple containing three elements (update result, Information about each package upgrade, upgrade count).
        """
        upgrade_count = 0
        check_result, check_log = PreCheck.kernel_consistency_check()
        if not check_result:
            return (
                TaskExecuteRes.FAIL,
                [
                    {
                        "available_rpm": rpm,
                        "result": TaskExecuteRes.FAIL,
                        "log": f"kernel consistency check failed\n{check_log}",
                    }
                    for rpm in rpms
                ],
                upgrade_count,
            )

        final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, []

        for rpm in rpms:
            code, stdout, stderr = execute_shell_command([f"dnf hotupgrade {rpm} -y"])
            tmp = {
                "available_rpm": rpm,
                "result": TaskExecuteRes.SUCCEED,
                "log": stdout + stderr,
            }
            if code != CommandExitCode.SUCCEED or "Apply hot patch succeed" not in stdout:
                tmp["result"] = TaskExecuteRes.FAIL
                final_fix_result = TaskExecuteRes.FAIL
            elif "Nothing to do" not in stdout:
                upgrade_count += 1

            if tmp["result"] == TaskExecuteRes.SUCCEED and accepted:
                try:
                    hotpatch_name = rpm.rsplit(".", 1)[0].split("-", 1)[1]
                    _, hotpatch_status_set_log = self._set_hotpatch_status_by_dnf_plugin(hotpatch_name, "accept")
                    tmp["log"] += f"\n\n{hotpatch_status_set_log}"
                except IndexError as error:
                    LOGGER.error(error)
                    tmp["log"] += f"\n\nhotpatch status set failed due to can't get correct hotpatch name!"
            package_update_info.append(tmp)
        return final_fix_result, package_update_info, upgrade_count

    @staticmethod
    def _query_fixable_cve_info() -> Tuple[str, dict]:
        """
        Query the CVEs fixed by the upgradeable version of each package

        Retunrs:
            Tuple[status, dict]
            a tuple containing two elements (status code, fixed_cve_info).

        Example:
            "Succeed", {"kernel": {
                "kernel-5.10.0-60.91.0.115.oe2203.x86_64": ["CVE-2023-1829"],
                "kernel-5.10.0-60.91.0.116.oe2203.x86_64": ["CVE-2023-2006"]
                }}
        """
        code, stdout, stderr = execute_shell_command(["dnf updateinfo list cves"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("Failed to query update info by dnf!")
            LOGGER.error(stderr)
            return COMMAND_EXEC_ERROR, defaultdict()

        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)", stdout)
        rpm_update_info = defaultdict(lambda: defaultdict(list))
        for cve_id, _, coldpatch in all_cve_info:
            rpm_name = coldpatch.rsplit("-", 2)[0]
            rpm_update_info[rpm_name][coldpatch].append(cve_id)

        return SUCCESS, rpm_update_info

    @staticmethod
    def _query_fixed_cve_info_by_hotpatch() -> Tuple[str, dict]:
        """
        Statistics CVE data that will be fixed by hotpatch

        Returns:
            Tuple[status, dict]
            a tuple containing two elements (status code, fixed_cve_info).

        Example:
            "Succeed", {"kernel": {"CVE-2023-XXXX","CVE-2022-XXXX"}}
        """
        code, stdout, stderr = execute_shell_command(["dnf hot-updateinfo list cves --installed"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error("Failed to query fixed cves by hotpatch!")
            LOGGER.error(stderr)
            return COMMAND_EXEC_ERROR, set()

        hotpatch_fixed_info = defaultdict(set)
        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout)
        for cve_id, _, _, hotpatch in all_cve_info:
            rpm_name = hotpatch.rsplit("-", 5)[0][6:]
            hotpatch_fixed_info[rpm_name].add(cve_id)

        return SUCCESS, hotpatch_fixed_info

    def compare_cve(self, rpm: str, updated_info: dict, hotpatch_fixed_info: dict) -> Tuple[bool, str]:
        """
        Determine whether the packages to be upgraded covers the vulnerabilities fixed by the hotpatch

        Args:
            rpms(list): List of packages that need to be upgraded

        Returns:
            Tuple[bool, str]
            a tuple containing two elements (compare result, compare log).
        """
        compare_info = dict()
        upgraded_packages: set = self._query_upgraded_packages(rpm)
        if not upgraded_packages:
            return False, "Execution of CVE comparison failed due to failure to query upgraded_packages."
        for rpm in upgraded_packages:
            fixed_cve_by_coldpatch = set()
            rpm_name = rpm.rsplit("-", 2)[0]

            for update_rpm, cve_list in updated_info.get(rpm_name, {}).items():
                if rpm >= update_rpm:
                    fixed_cve_by_coldpatch.update(cve_list)
            cve_difference_set = hotpatch_fixed_info.get(rpm_name, set()) - fixed_cve_by_coldpatch
            if cve_difference_set:
                compare_info[rpm_name] = cve_difference_set

        if not compare_info.values():
            return True, ""

        log = (
            "After upgrading the package, vulnerabilities in the package or in its dependent software package "
            "may be re-exposed. \nHere are some specific vulnerabilities that could potentially re-exposed:\n"
        )
        for rpm_name, cve_info in compare_info.items():
            for cve_id in cve_info:
                log += f"{rpm_name}\t{cve_id}\n"
        return False, log

    @staticmethod
    def _query_upgraded_packages(package: str) -> Set[str]:
        """
        Resolve all packages to be upgraded and their dependencies, store them in a set and
        return it

        Args:
            packages(list): List of package that need to be upgraded

        Returns:
            set

        """
        package_set = set()
        if package.rsplit("-", 2)[0] == "kernel":
            package_set.add(package)
            return package_set

        # The exit code of the command is 1 when input parameters contains assumeno
        _, stdout, _ = execute_shell_command([f"dnf upgrade-en {package} --assumeno"])

        installed_rpm_info = re.findall(r"(Upgrading|Installing):(.*?)Transaction Summary", stdout, re.S)
        if not installed_rpm_info:
            return package_set

        installed_rpm_info_list = installed_rpm_info[0][1].strip().split("\n")
        for single_rpm_info in installed_rpm_info_list:
            # info_list example:
            # ['aops-ceres', 'aarch64', 'v1.3.4-5.oe2203sp2', '@commandline', '107 k]
            pkg_info_list = re.split(r'\s+', single_rpm_info.strip())
            if len(pkg_info_list) < 5:
                break
            package_set.add(f"{pkg_info_list[0]}-{pkg_info_list[2]}.{pkg_info_list[1]}")
        return package_set

    @staticmethod
    def set_default_grub_kernel_version(kernel_rpm_name: str) -> bool:
        """
        Set the boot kernel

        Args:
            kernel_rpm_name(str): The name of the installed kernel package

        Returns:
            bool
        """
        boot_kernel_path = os.path.join("/boot/", f"vmlinuz-{kernel_rpm_name[7:]}")
        if not os.path.exists(boot_kernel_path):
            LOGGER.error("Can't find target kernel in /boot when set default kernel")
            return False

        LOGGER.info("The Linux boot kernel is about to be changed")
        code, _, stderr = execute_shell_command([f"grubby --set-default={boot_kernel_path}"])

        if code != CommandExitCode.SUCCEED:
            LOGGER.info("The Linux boot kernel change failed")
            LOGGER.error(stderr)
            return False
        LOGGER.info("The Linux boot kernel change successful")
        return True

    @staticmethod
    def _query_latest_dnf_transaction_id() -> Optional[int]:
        """Query latest yum transaction id

        Returns:
            int
        """
        # Example of command execution result:
        # [root@localhost ~]# dnf history
        # ID   | Command line   | Date and time       | Action(s)     | Altered
        # ---------------------------------------------------------------------
        # 3    | rm aops-ceres  | 2023-11-30 09:57    | Removed       | 1
        # 2    | install gcc    | 2023-11-30 09:57    | Install       | 1
        code, stdout, stderr = execute_shell_command(
            ["dnf history", "grep -E '^\s*[0-9]+'", "head -1", "awk '{print $1}'"]
        )
        if code != CommandExitCode.SUCCEED:
            LOGGER.error(stderr)
            return None

        return int(stdout)

    @staticmethod
    def _set_hotpatch_status_by_dnf_plugin(hotpatch: str, operation: str) -> Tuple[bool, str]:
        """
        change hotpatch status by dnf plugin

        Args:
            hotpatch(str):  hotpatch name which you want to change its status
            operation(str): the action that needs to be performed on this hot patch.
                            supported actions: apply,deactive,remove,active,accept
        Returns:
            Tuple[bool, str]
            a tuple containing two elements (operation result, operation log).
        """

        # replace -ACC to /ACC or -SGL to /SGL
        # Example: kernel-5.10.0-153.12.0.92.oe2203sp2-ACC-1-1 >> kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1
        wait_to_remove_patch = re.sub(r'-(ACC|SGL)', r'/\1', hotpatch)
        # Example of command execution result:
        # Succeed:
        # [root@openEuler ~]# dnf hotpatch --remove kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1
        # Last metadata expiration check: 3:24:16 ago on Wed 13 Sep 2023 08:16:17 AM CST.
        # Gonna remove this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1
        # remove hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' succeed
        # Fail:
        # [root@openEuler ~]# dnf hotpatch --accept kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1
        # Last metadata expiration check: 3:25:24 ago on Wed 13 Sep 2023 08:16:17 AM CST.
        # Gonna accept this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1
        # accept hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' failed, remain original status
        code, stdout, stderr = execute_shell_command([f"dnf hotpatch --{operation} {wait_to_remove_patch}"])
        if code != CommandExitCode.SUCCEED or 'failed' in stdout:
            LOGGER.error(f"hotpatch {hotpatch} set status failed!")
            return False, stdout + stderr

        return True, stdout + stderr

    def remove_hotpatch(self, cves: List[str]) -> dict:
        """
        remove hotpatch

        Args:
            cves(list): List of CVE IDs fixed by hotpatch,e.g.
                ["CVE-XXXX-XXXX"]

        Returns:
            dict e.g
                {
                    "status": "fail/succeed",
                    "cves": [{
                        "cve_id": cve,
                        "result": "succeed",
                        "log": "rollback succeed"
                    }]
                }
        """
        hotpatch_list = self._hotpatch_list_cve()
        if not hotpatch_list:
            return {
                "status": TaskExecuteRes.FAIL,
                "cves": [
                    dict(cve_id=cve, log="No valid hotpatch is matched.", result=TaskExecuteRes.FAIL) for cve in cves
                ],
            }

        wait_to_remove_patch = set()
        for cve in cves:
            wait_to_remove_patch = wait_to_remove_patch.union(hotpatch_list.get(cve, set()))

        hotpatch_remove_res = {}
        for patch in set(wait_to_remove_patch):
            remove_result, log = self._hotpatch_remove(patch)
            hotpatch_remove_res[patch] = {
                "result": TaskExecuteRes.SUCCEED if remove_result else TaskExecuteRes.FAIL,
                "log": log,
            }

        cve_hotpatch_remove_result = []

        for cve in cves:
            if cve not in hotpatch_list:
                fail_result = {
                    "cve_id": cve,
                    "log": "No valid hot patch is matched.",
                    "result": TaskExecuteRes.FAIL,
                }
                cve_hotpatch_remove_result.append(fail_result)
            else:
                tmp_result_list = []
                tmp_log = []

                for patch in hotpatch_list.get(cve):
                    tmp_result_list.append(hotpatch_remove_res[patch]["result"] == TaskExecuteRes.SUCCEED)
                    tmp_log.append(hotpatch_remove_res[patch]["log"])

                cve_hotpatch_remove_result.append(
                    {
                        "cve_id": cve,
                        "log": "\n".join(tmp_log),
                        "result": TaskExecuteRes.SUCCEED if all(tmp_result_list) else TaskExecuteRes.FAIL,
                    }
                )

        return {"status": TaskExecuteRes.SUCCEED, "cves": cve_hotpatch_remove_result}

    @staticmethod
    def _hotpatch_list_cve() -> dict:
        """
        Run the dnf hotpatch list cve command to query the hotpatch list corresponding to the cve

        Returns:
            dict: e.g
                {
                    "CVE-XXXX-XXX": {"patch 1", "patch 2"}
                }
        """
        code, stdout, _ = execute_shell_command([f"dnf hot-updateinfo list cves --installed", "grep patch"])
        if code != CommandExitCode.SUCCEED:
            LOGGER.error(f"Failed to query the hotpatch list.")
            return None

        all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout)
        if not all_cve_info:
            LOGGER.error(f"Failed to query the hotpatch list.")
            return None

        applied_hotpatch_info = {}
        hotpatch_dic = {}
        for cve_id, _, _, hotpatch in all_cve_info:
            applied_hotpatch_info[cve_id] = hotpatch
            hotpatch_dic_key = hotpatch.rsplit("-", 2)[0]
            if hotpatch_dic_key.endswith("ACC"):
                hotpatch_dic[hotpatch_dic_key] = max(hotpatch, hotpatch_dic.get(hotpatch_dic_key, hotpatch))

        for cve_id, cmd_output_hotpatch in applied_hotpatch_info.items():
            applied_hotpatch_info[cve_id] = hotpatch_dic.get(cmd_output_hotpatch.rsplit("-", 2)[0], cmd_output_hotpatch)

        hotpatch_list = defaultdict(set)
        for cve_id, hotpatch in applied_hotpatch_info.items():
            hotpatch_list[cve_id].add(hotpatch)

        return hotpatch_list

    def _hotpatch_remove(self, hotpatch: str) -> Tuple[bool, str]:
        """
        remove hotpatch package

        Args:
            hotpatch: hotpatch package which needs to remove
        """
        cmd = [f"dnf remove {hotpatch} -y"]
        _, stdout, stderr = execute_shell_command(cmd)
        return True, f"Command:{cmd}\n\n{stdout}\n{stderr}\n"
